Skip to content

fix(execution): mirror observer upstream hardening#59

Merged
aryasaatvik merged 4 commits into
devfrom
fix/execution-observer-upstream-parity
Jun 24, 2026
Merged

fix(execution): mirror observer upstream hardening#59
aryasaatvik merged 4 commits into
devfrom
fix/execution-observer-upstream-parity

Conversation

@aryasaatvik

@aryasaatvik aryasaatvik commented Jun 24, 2026

Copy link
Copy Markdown
Owner

Summary

  • Mirror the upstream execution observer foundation and hardening from feat(execution): add execution observer foundation RhysSullivan/executor#1097.
  • Keep the dev branch aligned with the scoped observer API: withExecutionObserver, emitExecutionEvent, and composed plugin observers.
  • Preserve fork-only execution actor fields while matching upstream observer failure handling and deterministic dispatch behavior.
  • Update dev-only execution observer plugins to use exhaustive Effect Match dispatch for ExecutionEvent handling.

Type Safety Note

Plugin observers handle ExecutionEvent as an exhaustive Effect tagged-union match rather than a raw switch (event._tag) or predicate chain.

execution-history now uses Match.exhaustive for the full lifecycle stream, so a future event variant becomes a compile-time update point. The metrics observers also use exhaustive matching and explicitly ignore interaction events with no-op cases.

import { Effect, Match } from "effect";
import { type ExecutionEvent } from "@executor-js/sdk";

const handleExecutionEvent = (history: ExecutionHistoryExtension) =>
  Match.type<ExecutionEvent>().pipe(
    Match.withReturnType<Effect.Effect<void, unknown>>(),
    Match.tag("ExecutionStarted", (event) => history.store.createRun(event)),
    Match.tag("ToolCallStarted", (event) => history.store.createToolCall(event)),
    Match.tag("ToolCallFinished", (event) => history.store.finishToolCall(event)),
    Match.tag("InteractionStarted", (event) => history.store.createInteraction(event)),
    Match.tag("InteractionResolved", (event) => history.store.resolveInteraction(event)),
    Match.tag("ExecutionFinished", (event) => history.store.finishRun(event)),
    Match.exhaustive,
  );

Validation

  • bun run --cwd packages/core/sdk test -- execution-observer.test.ts
  • bun run --cwd packages/core/execution test -- engine-observer.test.ts
  • bun run --cwd packages/core/sdk typecheck
  • bun run --cwd packages/core/execution typecheck
  • bun run --cwd packages/plugins/execution-history test
  • bun run --cwd packages/plugins/execution-metrics test
  • bun run --cwd packages/plugins/execution-history typecheck
  • bun run --cwd packages/plugins/execution-metrics typecheck
  • touched-file oxfmt --check
  • touched-file oxlint -c .oxlintrc.jsonc --deny-warnings
  • git diff --check

@greptile-apps

greptile-apps Bot commented Jun 24, 2026

Copy link
Copy Markdown

Greptile Summary

This PR migrates the execution observer system from a closure-based ignoreExecutionObserverErrors wrapper to a scoped Context.Reference pattern (withExecutionObserver / emitExecutionEvent), enabling detached fibers to inherit the observer context across pause/resume boundaries. It also hardens error handling so interrupt causes re-propagate from observers rather than being silently swallowed, and upgrades all plugin observer handlers from Predicate.isTagged if-chains to exhaustive Match.type patterns for compile-time safety.

  • Observer context scoping: withExecutionObserver wraps each public engine method with Effect.provideService, so forked fibers inside executeWithPause automatically inherit the observer — eliminating the previous gap where pause/resume events could miss the observer.
  • Interrupt propagation: non-interrupt observer failures are now logged instead of swallowed; Cause.hasInterrupts causes are re-raised, with new tests verifying both the scoped and composed-observer paths.
  • Sequential dispatch: composeExecutionObservers switches from { concurrency: "unbounded" } to sequential Effect.forEach to guarantee deterministic plugin ordering; an interrupting plugin now stops subsequent plugins from receiving the event.

Confidence Score: 5/5

Safe to merge — the context-scoped observer wiring is architecturally sound, detached fibers correctly inherit the observer at fork time, and interrupt re-propagation is explicitly covered by new tests.

The core change — replacing an ad-hoc closure with Effect's Context.Reference pattern — is straightforward and the type signatures enforce correctness end-to-end. The sequential dispatch trade-off is deliberate, tested, and the interrupt short-circuit behaviour is validated. Only a stale JSDoc comment was found; no logic defects or runtime regressions are present.

packages/plugins/execution-metrics/src/cloudflare/index.ts has a stale JSDoc describing the old "errors are swallowed" contract; worth updating before merging to keep observer contract documentation accurate for future implementors.

Important Files Changed

Filename Overview
packages/core/sdk/src/execution-observer.ts Introduces Context.Reference scoped observer, emitExecutionEvent, withExecutionObserver, and updated composeExecutionObservers with sequential dispatch and interrupt re-propagation. Clean implementation with correct type erasure.
packages/core/execution/src/engine.ts Replaces emit closure with emitExecutionEvent calls and wraps public methods with observeExecution; detached fiber correctly inherits observer context at fork time inside the provideService scope.
packages/core/sdk/src/execution-observer.test.ts Adds tests for scoped observer emission, sequential ordering with async boundary, interrupt propagation in both scoped and composed paths, and the no-op case. Coverage is thorough.
packages/plugins/execution-history/src/sdk/store.ts handleEvent migrated from Predicate.isTagged if-chain with fallthrough to Match.exhaustive — eliminates the silent no-op for future event variants.
packages/plugins/execution-metrics/src/cloudflare/index.ts Migrated to Match.exhaustive; InteractionStarted/Resolved now explicitly no-oped. JSDoc still describes the old "errors are swallowed" contract, which has changed.
packages/plugins/execution-metrics/src/sdk/observer.ts Migrated from Predicate.isTagged if-chain to Match.exhaustive with explicit no-ops for Interaction events. Clean and correct.
packages/core/sdk/src/index.ts Replaces ignoreExecutionObserverErrors export with emitExecutionEvent and withExecutionObserver. Breaking change, correctly documented in PR.

Sequence Diagram

%%{init: {'theme': 'neutral'}}%%
sequenceDiagram
    participant Caller
    participant Engine
    participant ObserverCtx as Context.Reference<br/>(currentExecutionObserver)
    participant ComposedObs as composeExecutionObservers
    participant PluginA as Plugin Observer A
    participant PluginB as Plugin Observer B
    participant DetachedFiber as Detached Fiber<br/>(pausable exec)

    Caller->>Engine: executeWithPause(code, options)
    Engine->>ObserverCtx: "withExecutionObserver(config.observer)<br/>provideService to entire effect scope"
    Engine->>ObserverCtx: emitExecutionEvent(ExecutionStarted)
    ObserverCtx->>ComposedObs: handle(ExecutionStarted)
    ComposedObs->>PluginA: handle(event) [sequential]
    PluginA-->>ComposedObs: Effect.void or logs error
    ComposedObs->>PluginB: handle(event) [sequential, only if A did not interrupt]
    PluginB-->>ComposedObs: Effect.void
    Engine->>DetachedFiber: Effect.forkDetach (inherits observer context)
    Engine-->>Caller: pause handle

    Caller->>Engine: resume(executionId, response)
    Engine->>ObserverCtx: "withExecutionObserver(config.observer)<br/>wraps resume scope"
    DetachedFiber->>ObserverCtx: "emitExecutionEvent(ExecutionFinished)<br/>(from inherited context)"
    ObserverCtx->>ComposedObs: handle(ExecutionFinished)
    Note over ComposedObs,PluginB: interrupt from any observer<br/>stops remaining dispatch
    Engine-->>Caller: ExecutionResult
Loading
%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%%
sequenceDiagram
    participant Caller
    participant Engine
    participant ObserverCtx as Context.Reference<br/>(currentExecutionObserver)
    participant ComposedObs as composeExecutionObservers
    participant PluginA as Plugin Observer A
    participant PluginB as Plugin Observer B
    participant DetachedFiber as Detached Fiber<br/>(pausable exec)

    Caller->>Engine: executeWithPause(code, options)
    Engine->>ObserverCtx: "withExecutionObserver(config.observer)<br/>provideService to entire effect scope"
    Engine->>ObserverCtx: emitExecutionEvent(ExecutionStarted)
    ObserverCtx->>ComposedObs: handle(ExecutionStarted)
    ComposedObs->>PluginA: handle(event) [sequential]
    PluginA-->>ComposedObs: Effect.void or logs error
    ComposedObs->>PluginB: handle(event) [sequential, only if A did not interrupt]
    PluginB-->>ComposedObs: Effect.void
    Engine->>DetachedFiber: Effect.forkDetach (inherits observer context)
    Engine-->>Caller: pause handle

    Caller->>Engine: resume(executionId, response)
    Engine->>ObserverCtx: "withExecutionObserver(config.observer)<br/>wraps resume scope"
    DetachedFiber->>ObserverCtx: "emitExecutionEvent(ExecutionFinished)<br/>(from inherited context)"
    ObserverCtx->>ComposedObs: handle(ExecutionFinished)
    Note over ComposedObs,PluginB: interrupt from any observer<br/>stops remaining dispatch
    Engine-->>Caller: ExecutionResult
Loading

Reviews (4): Last reviewed commit: "refactor(plugins): match execution event..." | Re-trigger Greptile

Comment thread packages/core/sdk/src/execution-observer.ts
Comment thread packages/core/sdk/src/execution-observer.ts Outdated
@aryasaatvik aryasaatvik merged commit be02315 into dev Jun 24, 2026
8 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant